package com.omuao.message.websocket.service.impl;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.omuao.message.websocket.config.MqttProperties;
import com.omuao.message.websocket.enums.MqttActionType;
import com.omuao.message.websocket.error.ServiceException;
import com.omuao.message.websocket.facade.ClientSessionManager;
import com.omuao.message.websocket.facade.MqttWebSocketService;
import com.omuao.message.websocket.facade.QueueDataManager;
import com.omuao.message.websocket.facade.WebSocketMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;

import java.io.IOException;
import java.util.Map;

/**
 * //step 1 建立连接将RocketMQ的消息 放入到 RocksDB中。
 * //step 2 根据消息消费状态判断消息是否消费。
 * //step 3 消息消费后，移除RocksDB.
 * <p>
 * //step 1 建立连接将RocketMQ的消息 放入到 RocksDB中。
 * //step 2 断开连接后，将该消息重新发出
 *
 * @author omuao
 */
@Service
public class MqttWebSocketServiceImpl implements MqttWebSocketService {

    public static Logger logger = LoggerFactory.getLogger(MqttWebSocketServiceImpl.class);

    @Autowired
    ClientSessionManager clientSessionManager;

    @Autowired
    MqttProperties mqttProperties;

    @Autowired
    QueueDataManager queueDataManager;

    @Autowired
    ObjectMapper objectMapper;

    @Override
    public void processHookEvent(Map<String, Object> map) throws ServiceException {
        logger.info(map.toString());
        String clientId = (String) map.getOrDefault("clientid", "");
        if (StringUtils.isEmpty(clientId)) {
            return;
        }

        //超级客户端消息跳过
        if (mqttProperties.getSuperClientId().equals(clientId)) {
            return;
        }

        String action = (String) map.getOrDefault("action", "");

        if (StringUtils.isEmpty(action)) {
            return;
        }

        MqttActionType actionType = MqttActionType.valueOf(action.toUpperCase());
        switch (actionType) {
            case MESSAGE_ACKED:
                logger.info(">>>> 服务器已确认");
                break;
            case MESSAGE_PUBLISH:
                logger.info(">>>> 发送消息回调");
                //onMessage
                break;
            case CLIENT_CONNECTED:
                logger.info(">>>> 客户端建立连接");
                this.connected(clientId);
                break;
            case CLIENT_SUBSCRIBE:
                logger.info(">>>> 客户订阅消息");
                break;
            case MESSAGE_DELIVERED:
                logger.info(">>>> 消息被消费");
                //RocksDB--
                this.delivered(clientId, map);
                break;
            case CLIENT_UNSUBSCRIBE:
                logger.info(">>>> 客户取消订阅消息");
                break;
            case SESSION_SUBSCRIBED:
                logger.info(">>>> 客户订阅消息完成");
                this.subscribed(clientId, map);
                break;
            case CLIENT_DISCONNECTED:
                logger.info(">>>> 客户断开连接");
                this.disconnected(clientId);
                break;
            case SESSION_UNSUBSCRIBED:
                logger.info(">>>> 客户取消订阅消息完成");
                this.unsubscribed(clientId, map);
                break;
            default:
                break;

        }
    }

    /**
     * 消息被消费 移除消息
     *
     * @param clientId   客户端ID
     * @param eventInfos 事件信息
     */
    private void delivered(String clientId, Map<String, Object> eventInfos) {
        String message = (String) eventInfos.getOrDefault("payload", "");
        if (StringUtils.isEmpty(message)) {
            return;
        }

        String topic = (String) eventInfos.getOrDefault("topic", "");
        if (!topic.startsWith(mqttProperties.getTopicPrefix() + clientId)) {
            return;
        }

        WebSocketMessage webSocketMessage = null;
        try {
            webSocketMessage = objectMapper.readValue(message, WebSocketMessage.class);
        } catch (IOException e) {
            logger.error(e.getMessage(), e);
        }
        WebSocketMessage msg = queueDataManager.query(webSocketMessage);
        if (msg == null) {
            return;
        }
        logger.info("客户端已消费消息：" + clientId);
        queueDataManager.delete(webSocketMessage);
    }

    @Override
    public boolean auth(String username, String password, String clientId) {
        //超级客户端
        if (mqttProperties.getSuperUsername().equals(username)
                && mqttProperties.getSuperPassword().equals(password)
                && mqttProperties.getSuperClientId().equals(clientId)) {
            return true;
        }

        //其他客户端
        return true;
    }

    /**
     * 建立连接
     *
     * @param clientId 客户端
     */
    private void connected(String clientId) {
        if (!clientSessionManager.contains(clientId)) {
            //Redis 在线人数+1
            logger.info("ID {} 上线！", clientId);
        }
    }

    /**
     * 断开连接
     *
     * @param receiverId 接受人ID
     */
    private void disconnected(String receiverId) {
        if (!clientSessionManager.contains(receiverId)) {
            return;
        }
        //Redis 在线人数-1
        logger.info("ID {} 下线！", receiverId);

        //移除订阅

        this.removeSubscribe(receiverId);
    }

    private void unsubscribed(String clientId, Map<String, Object> map) {
        String topic = (String) map.getOrDefault("topic", "");
        if (!topic.startsWith(mqttProperties.getTopicPrefix() + clientId)) {
            return;
        }
        this.removeSubscribe(clientId);
    }

    private void subscribed(String clientId, Map<String, Object> map) {
        String topic = (String) map.getOrDefault("topic", "");
        if (!topic.startsWith(mqttProperties.getTopicPrefix() + clientId)) {
            return;
        }

        if (clientSessionManager.contains(clientId)) {
            return;
        }

        clientSessionManager.put(clientId);
    }


    /**
     * 移除订阅
     *
     * @param receiverId 订阅人
     */
    private void removeSubscribe(String receiverId) {
        if (clientSessionManager.contains(receiverId)) {
            clientSessionManager.remove(receiverId);
        }
    }

    @Override
    public void sendOfflineMessage() throws ServiceException {
        queueDataManager.sendOfflineMessage();
    }

    @Override
    public void insert(WebSocketMessage webSocketMessage) throws ServiceException {
        queueDataManager.insert(webSocketMessage);
    }
}
